# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
from hysop.tools.htypes import check_instance, to_list, first_not_None, to_set
from hysop.tools.decorators import debug, static_vars
from hysop.tools.sympy_utils import subscript
from hysop.constants import Implementation, DirectionLabels, TranspositionState
from hysop.core.graph.graph import generated
from hysop.core.graph.node_generator import ComputationalGraphNodeGenerator
from hysop.core.graph.computational_operator import ComputationalGraphNode
[docs]
class DirectionalOperatorBase:
"""
Implementation interface for directional operators.
"""
@debug
def __new__(cls, splitting_dim, splitting_direction, dt_coeff, **kwds):
return super().__new__(cls, **kwds)
@debug
def __init__(self, splitting_dim, splitting_direction, dt_coeff, **kwds):
"""
Create a directional operator in a given direction.
Parameters
----------
splitting_direction: int
Direction of this operator.
splitting_dim:
The dimension of the splitting.
dt_coeff:
Coefficient that should be applied on simulation timestep.
Attributes
----------
splitting_direction: int
Direction of this operator.
splitting_dim: int
The dimension of the splitting.
dt_coeff: float
Coefficient that should be applied on simulation timestep.
"""
check_instance(splitting_dim, int)
check_instance(splitting_direction, int)
check_instance(dt_coeff, float)
dim = splitting_dim
direction = splitting_direction
if (direction < 0) or (direction >= dim):
msg = "Bad splitting direction '{}' for splitting dimension {}."
msg = msg.format(direction, dim)
raise ValueError(msg)
assert dt_coeff > 0.0
self.splitting_dim = splitting_dim
self.splitting_direction = splitting_direction
self.dt_coeff = dt_coeff
super().__init__(**kwds)
[docs]
@debug
def get_field_requirements(self):
requirements = super().get_field_requirements()
direction = self.splitting_direction
dim = self.splitting_dim
sdir = DirectionLabels[direction]
axes = TranspositionState[dim].filter_axes(
lambda axes: (axes[-1] == dim - 1 - direction)
)
axes = tuple(axes) # might be problematic for hg dimensions
for field, td, req in requirements.iter_input_requirements():
req.axes = axes
for field, td, req in requirements.iter_output_requirements():
req.axes = axes
return requirements
[docs]
class DirectionalOperatorGeneratorI:
def __new__(cls, **kwds):
return super().__new__(cls, **kwds)
def __init__(self, **kwds):
super().__init__(**kwds)
self._generated = False
self._splitting_dim = None
[docs]
@abstractmethod
def generate_only_once_per_direction(self):
pass
[docs]
@debug
def generate(self, splitting_dim, **kwds):
self._splitting_dim = splitting_dim
self._direction_counter = [
0,
] * splitting_dim
self._generated = True
try:
return super().generate(**kwds)
except AttributeError:
pass
[docs]
@abstractmethod
@generated
def generate_direction(self, i, dt_coeff):
if (i < 0) or (i >= self._splitting_dim):
msg = "Requested direction is out of bounds (i={}, max_dir={})."
msg = msg.format(i, self._splitting_dim)
raise ValueError(msg)
if self.generate_only_once_per_direction() and (self._direction_counter[i] > 0):
return False
self._direction_counter[i] += 1
return True
[docs]
@generated
def splitting_dimension(self):
"""
Returns the splitting dimension used to generate this DirectionalOperatorGenerator.
"""
return self._splitting_dim
[docs]
@generated
def get_direction(self, i, dt_coeff):
"""
Retrieve operator in direction i.
"""
return self.generate_direction(i, dt_coeff)
[docs]
class DirectionalOperatorGenerator(DirectionalOperatorGeneratorI, metaclass=ABCMeta):
"""
Simple ComputationalGraphNodeGenerator to generate an operator in
multiple directions.
"""
@debug
def __new__(
cls,
operator,
base_kwds,
candidate_input_tensors,
candidate_output_tensors,
name=None,
pretty_name=None,
**op_kwds,
):
return super().__new__(cls, **base_kwds)
@debug
def __init__(
self,
operator,
base_kwds,
candidate_input_tensors,
candidate_output_tensors,
name=None,
pretty_name=None,
**op_kwds,
):
"""
Initialize a DirectionalOperatorGenerator.
Parameters
----------
operator: ComputationalGraphNodeGenerator or ComputationalGraphNode
operator class to be built in each direction.
operator.__init__ should accept all extra keywords
arguments op_kwds + 'direction' + 'splitting_dim' + 'name'
base_kwds: dict, optional, defaults to None
Base class keywords arguments.
If None, an empty dict will be passed.
op_kwds:
Keywords arguments that will be passed towards operator.__init__
during a call to _generate. In addition to those arguments,
direction and splitting_dim will also be passed.
candidate_input_tensors: tuple of fields, optional
Input tensor fields that should be rebuilt at discretization.
ScalarFields are filtered out.
candidate_output_tensors: tuple of fields, optional
Output tensor fields that should be rebuilt at discretization.
ScalarFields are filtered out.
"""
super().__init__(**base_kwds)
candidate_input_tensors = first_not_None(candidate_input_tensors, ())
candidate_output_tensors = first_not_None(candidate_output_tensors, ())
self.name = first_not_None(name, type(self).__name__)
self.pretty_name = first_not_None(pretty_name, self.name)
self.candidate_input_tensors = set(
filter(lambda x: x.is_tensor, candidate_input_tensors)
)
self.candidate_output_tensors = set(
filter(lambda x: x.is_tensor, candidate_output_tensors)
)
self._operator = operator
self._op_kwds = op_kwds
self._node_generators = None
self._input_fields_to_dump = None
self._output_fields_to_dump = None
[docs]
def dump_outputs(self, fields=None, io_params=None, directions=None, **kwds):
"""
Tell this operator to dump some of its outputs after
apply is called.
Target folder, file, dump frequency and other io pameters
are passed through instance io_params of this parameter.
"""
assert self._generated is False
if directions is not None:
directions = to_list(directions)
directions = {int(d) for d in directions}
assert (io_params is None) or (directions is None)
output_kwds = dict(io_params=io_params)
output_kwds.update(kwds)
self._output_fields_to_dump = (fields, directions, output_kwds)
[docs]
@debug
def generate(self, splitting_dim, **extra_kwds):
"""
Generate splitting_dim operators in each direction.
"""
if self._generated:
assert splitting_dim == self._splitting_dim
return
super().generate(splitting_dim=splitting_dim, **extra_kwds)
op_kwds = extra_kwds.copy()
op_kwds["splitting_dim"] = splitting_dim
op_kwds.update(self._op_kwds)
self._op_kwds = op_kwds
[docs]
def generate_only_once_per_direction(self):
return False
[docs]
@generated
def generate_direction(self, i, dt_coeff):
should_generate = super().generate_direction(i=i, dt_coeff=dt_coeff)
if "mpi_params" in self._op_kwds.keys():
should_generate = should_generate and self._op_kwds["mpi_params"].on_task
if not should_generate:
return ()
kwds = self._op_kwds
basename = kwds.pop("name", self.name)
basepname = kwds.pop("pretty_name", self.pretty_name)
kargs = {}
kargs.update(kwds)
kargs.update(self.custom_directional_kwds(i))
name = f"{basename}_{DirectionLabels[i]}_{self._direction_counter[i]}"
pname = "{}_{}{}".format(
basepname, DirectionLabels[i], subscript(self._direction_counter[i])
)
try:
op = self._operator(
name=name,
pretty_name=pname,
splitting_direction=i,
dt_coeff=dt_coeff,
**kargs,
)
except:
sargs = [f"*{k} = {v.__class__}" for (k, v) in kargs.items()]
msg = "FATAL ERROR during {}.generate():\n"
msg += " => failed to initialize an instance of type {}"
msg += "\n by using the following keyword arguments:"
msg += "\n " + "\n ".join(sargs)
msg = msg.format(self.__class__, self.implementation)
print(f"\n{msg}")
raise
if self._input_fields_to_dump is not None:
(fields, dirs, kwds) = self._input_fields_to_dump
if (dirs is None) or (int(i) in dirs):
if fields is not None:
fields = set(fields).intersection(op.input_fields.keys())
else:
fields = op.input_fields.keys()
if fields:
op.dump_inputs(fields=fields, **kwds)
if self._output_fields_to_dump is not None:
(fields, dirs, kwds) = self._output_fields_to_dump
if (dirs is None) or (int(i) in dirs):
if fields is not None:
fields = set(fields).intersection(op.output_fields.keys())
else:
fields = op.output_fields.keys()
if fields:
op.dump_outputs(fields=fields, **kwds)
return op
[docs]
def custom_directional_kwds(self, i):
"""Specify custom keyword arguments that will be passed only for direction i."""
return {}
[docs]
class DirectionalOperatorFrontend(DirectionalOperatorGenerator, metaclass=ABCMeta):
"""
Frontend facility for directional operators that provide
multiple implementations.
"""
@debug
def __new__(cls, implementation=None, base_kwds=None, **op_kwds):
base_kwds = {} if (base_kwds is None) else base_kwds
return super().__new__(cls, operator=None, base_kwds=base_kwds, **op_kwds)
@debug
def __init__(self, implementation=None, base_kwds=None, **op_kwds):
"""
Initialize a DirectionalOperatorFrontend.
Parameters
----------
implementation: Implementation, optional, defaults to None
target implementation, should be contained in available_implementations().
If None, implementation will be set to default_implementation().
base_kwds: dict, optional, defaults to None
Base class keywords arguments.
If None, an empty dict will be passed.
op_kwds:
Keywords arguments that will be passed towards implementation implemention
__init__ during a call to _generate.
In addition to those arguments, direction and splitting_dim will
also be passed.
candidate_input_tensors: tuple of fields, optional
Candidate input tensor fields that should be rebuilt at discretization.
ScalarFields are filtered out.
candidate_output_tensors: tuple of fields, optional
Candidate output tensor fields that should be rebuilt at discretization.
ScalarFields are filtered out.
Attributes
----------
implementation: Implementation
the implementation implementation
"""
base_kwds = {} if (base_kwds is None) else base_kwds
check_instance(implementation, Implementation, allow_none=True)
check_instance(base_kwds, dict, keys=str)
operator = self._get_implementation(implementation)
if not issubclass(operator, DirectionalOperatorBase):
msg = "Class {} does not inherit from the directional operator interface "
msg += "({})."
msg = msg.format(operator.__class__, DirectionalOperatorBase)
raise TypeError(msg)
if not issubclass(operator, ComputationalGraphNode):
msg = (
"Class {} does not inherit from the computational graph node interface"
)
msg += "({})."
msg = msg.format(operator.__class__, ComputationalGraphNode)
raise TypeError(msg)
super().__init__(operator=operator, base_kwds=base_kwds, **op_kwds)
self.implementation = implementation
def _get_implementation(self, implementation):
"""
Make some checks and return the operator implementation corresponding
to chosen implementation.
"""
default_implementation = self.default_implementation()
available_implementations = self.available_implementations()
if not isinstance(default_implementation, Implementation):
msg = "default_implementation is not a instance of hysop.implementation.Implementation."
raise TypeError()
for b in available_implementations:
if not isinstance(b, Implementation):
msg = "{} is not a instance of hysop.implementation.Implementation."
msg = msg.format(b)
raise TypeError(msg)
if default_implementation not in available_implementations:
msg = (
"default_implementation is not contained in available_implementations."
)
raise ValueError()
if implementation is None:
implementation = default_implementation
elif implementation not in available_implementations:
simplementations = []
simplementations.append(f"-{default_implementation} (default)")
for b in available_implementations:
if b != default_implementation:
simplementations.append(f"-{b}")
msg = "\nSpecified implementation '{}' is not an available implementation\n"
msg += "for operator {}, available implementations are:\n {}"
msg = msg.format(
implementation, self.__class__.__name__, "\n ".join(simplementations)
)
raise ValueError(msg)
return self.implementations()[implementation]
[docs]
@abstractmethod
def implementations(cls):
"""
Should return all implementations as a dictionnary.
Keys are Implementation instances and values are either ComputationalGraphNode
or ComputationalGraphNodeGenerator.
"""
pass
[docs]
@abstractmethod
def default_implementation(cls):
"""
Return the default Implementation, should be compatible with available_implementations.
"""
pass
[docs]
@classmethod
def available_implementations(cls):
"""
Return all available implementations.
"""
return cls.implementations().keys()
def _get_generated(self):
return self._generated
generated = property(_get_generated)